Cloud Functionsの重複実行について、Firestoreを使った対策を考えてみた
クラスメソッド株式会社データアナリティクス事業本部所属のニューシロです。
今回はGoogle CloudのサービスであるCloud Functionsの重複実行について、同じくGoogle CloudのサービスであるFirestoreを使った対策を考えてみました。
前提
Cloud Functionsの重複実行とは
Cloud Functionsのイベントトリガーを使用すると、特定のイベントの発生に対してCloud Functionsを起動させることができます。
しかし、イベントトリガーCloud Functionsは1回のイベントに対し2回以上起動してしまうことがあります。
重複実行が発生しても問題がないように、Cloud Functionsではベストプラクティスとしてべき等関数を作成することが挙げられています。
詳しくは以下ブログをご参照ください。
Firestoreとは
FirestoreとはGoogle Cloudが提供するNoSQLドキュメントデータベースです。
詳しくは以下ブログをご参照ください。
今回はこのFirestoreとCloud Functionsを組み合わせて、Cloud Functionsの重複実行の対策を考えていきたいと思います。
本題
今回想定するCloud Functionsの実行ケース
今回は、Cloud SchedulerでCloud Functionsを定期実行するケースを考えます。
Cloud Schedulerは、フルマネージドのcronジョブサービスです。
Cloud Scheduler では、作業単位のスケジュールを設定して、定義した回数または一定の間隔で実行できます。これらの作業単位は、一般的に cron ジョブと呼ばれています。代表的な使い方としては、レポートメールを毎日送信する、10 分間隔でキャッシュ データを更新する、1 時間に 1 回要約情報を更新する、などがあります。
Cloud SchedulerでPub/Subへcronジョブを送信したことをトリガーに、Pub/SubトリガーCloud Functionsが起動する想定です。
方針①イベントIDで重複を判別する
Cloud Functions重複実行の対策について、方針を2つ考えてみます。
重複実行のパターンとして、まずPub/Sub側で重複が発生する場合があります。
Pub/Sub側で重複が発生する場合
図のように、イベントトリガーで用いられているPub/Sub pushサブスクリプションの性質によりメッセージを複数回配信してしまうことがあります。 この場合、重複して実行されるイベントは同じイベントIDを持つと考えられます。
よって、FirestoreにイベントIDを記録しておき、Cloud Functions実行時はFirestoreに同一イベントIDがあるかどうかを確認し、同一イベントIDがある場合はCloud Functionsの実行をスキップするといった方法が考えられます。
このイベントIDで重複を確認する方針は、公式ドキュメントにも記載があります。
コードとは関係なく、トランザクション チェックをサービスの外側に置きます。たとえば、指定されたイベント ID がすでに処理されたことを記録している場所の状態を保持します。
良い方針ですが、今回想定するケースはCloud Schedulerがイベント発生元ですので、その性質も考慮してみます。 今回使用するCloud Schedulerは、少なくとも 1 回実行されるcronジョブですので、Pub/Sub同様こちらも1回の実行で2回以上ジョブが実行されてしまう可能性があります。
Cloud Scheduler は、「少なくとも 1 回」を基本に処理を行うよう設計されています。つまり、ジョブはスケジュールされた実行ごとに少なくとも 1 回実行されます。まれに、スケジュールの 1 つのインスタンスに関連してジョブが複数回実行される可能性があるため、コードで繰り返し実行されても有害な副作用が生じないようにする必要があります。
このようにCloud Scheduler側で重複が発生する場合も考えてみましょう。
Cloud Scheduler側で重複が発生する場合
この場合、Pub/Sub側がそれぞれ別のメッセージと解釈するため、異なるイベントIDが付与されると考えられます。 そうすると、イベントIDで重複を確認する方法では対応できません。
異なるイベントIDに対しても重複実行の対応ができるよう、今回は別の方針を考えてみます。
方針②(今回の方針)Pub/Subがメッセージを受信した時点のタイムスタンプで重複を判別する
イベントトリガーCloud Functionsでは、イベントの送受信にPub/Subを使用しています。
Pub/Subメッセージには、Pub/Subがメッセージを受信した時点のタイムスタンプが付与されているため、こちらを利用する方針を考えてみます。
Pub/Subメッセージサンプル
{ "message": { "attributes": { "key": "value" }, "data": "SGVsbG8gQ2xvdWQgUHViL1N1YiEgSGVyZSBpcyBteSBtZXNzYWdlIQ==", "messageId": "2070443601311540", "message_id": "2070443601311540", "publishTime": "2021-02-26T19:13:55.749Z", "publish_time": "2021-02-26T19:13:55.749Z" }, "subscription": "projects/myproject/subscriptions/mysubscription" }
- Pub/Subメッセージサンプル引用元:プッシュ サブスクリプション | Cloud Pub/Sub ドキュメント | Google Cloud - push エンドポイントによるメッセージの受信方法
このpublish_time(publishTime)
を利用して、前回のpublish_time
と今回のpublish_time
を比較し、短時間に複数メッセージが送信されている場合は2回目以降の実行はスキップするといった対策が考えられます。
時間差をコード上で計算することにより、イベントIDが同一の場合でも異なる場合でも対応できそうです。 今回はこちらの方針で考えてみたいと思います。
Firestore実装
データベース作成
以下の設定でデータベースを作成します。
- モード:ネイティブモード
- データベースID:(default)
- 本番環境ルール
Firestoreの設定方法など、詳しくは以下ブログをご参照ください。
Cloud Functions実装
設定
今回利用したCloud Functionsの設定です。
- 第2世代
- Pub/Subトリガー
- Python3.11
Pub/Subトリガーを用いるため、あらかじめトリガー元のPub/Subトピックも作成しておきます。
起動元であるCloud Schedulerも作成します。
サービスアカウントの権限
Cloud Functionsで使用したサービスアカウントには、roles/datastore.user(Cloud Datastore ユーザー)
を付与してあります。
コード
Cloud Functionsに実装したコードです。
import base64 import os from datetime import datetime, timedelta import functions_framework from google.cloud import firestore MY_PROJECT_ID = os.getenv("MY_PROJECT_ID") # 環境変数からプロジェクト名を取得 def is_duplicate_execution(current_publish_time: str, last_publish_time: str) -> bool: # datetime型へ変換 current_publish_dt = datetime.strptime(current_publish_time, "%Y-%m-%dT%H:%M:%S.%fZ") last_publish_dt = datetime.strptime(last_publish_time, "%Y-%m-%dT%H:%M:%S.%fZ") # 時間差を計算し60秒以内ならTrueを返す time_diff = current_publish_dt - last_publish_dt print(f"time_diff: {time_diff}") max_diff_seconds = 60 return time_diff <= timedelta(seconds=max_diff_seconds) @functions_framework.cloud_event def avoid_duplicate_execution(cloud_event) -> None: # Pub/Subメッセージ情報を取得 pubsub_message = cloud_event.data["message"] publish_time = pubsub_message["publish_time"] # Firestoreから前回のPub/Subメッセージ情報を取得 db = firestore.Client(project=MY_PROJECT_ID) doc_ref = db.collection("cloud-functions").document("avoid-duplicate-execution") doc_dict = doc_ref.get().to_dict() # 最初の実行なら重複確認はスキップ if not doc_dict: print("First execution.") # 重複実行であれば処理終了 elif is_duplicate_execution(publish_time, doc_dict["publish_time"]): print("Duplicate execution.") return # Firestore更新用データ data = { "publish_time": publish_time, "message": base64.b64decode(pubsub_message["data"]).decode() } # Firestoreを更新 doc_ref.set(data) # ======================================== # # ここにCloud Functionsで実行したい処理を書く # # ======================================== print("Function was executed.")
functions-framework==3.* google-cloud-firestore
前回のpublish_time
と今回のpublish_time
を比較し、時間差が60秒以内ならCloud Functionsの処理をスキップする仕様です。
期待通りの挙動をとるか検証してみます。
Cloud Scheduler実行
Cloud Schedulerを実行してみます。その後、コンソール上でCloud LoggingとFirestoreを確認していきます。
1回目の実行
Cloud Schedulerを実行します。1回目はFirestoreに過去の実行情報がありませんので、重複確認はしません。
今回の実行によりFirestoreへ書き込みが行われ、次回の実行から重複確認が行われるようになります。
Function was executed.
がログに出力されていることから、Cloud Functionsで処理が実行されていることがわかります。
Firestoreに1回目の処理の情報が書き込まれていることもわかります。
前回の実行から約2分後に実行
前回の実行から約2分後にCloud Schedulerを再度実行します。
こちらは重複確認が行われ、前回の実行から60秒以上経過しているためCloud Functionsで処理が実行される想定です。
Function was executed.
がログに出力されていることから、Cloud Functionsで処理が実行されていることがわかります。
こちらは重複確認も行われており、変数time_diff
の出力結果から2分程度の時間差があることも確認できます。
Firestoreの情報も更新されています。
前回の実行から約30秒後に実行
約30秒後にCloud Schedulerを再度実行します。
前回の実行から60秒以内のため、Cloud Functionsは処理はスキップしてくれる想定です。
前回の実行との時間差が60秒以内のため、重複実行と判定しています。
ログにDuplicate execution.
と出力されていること、またFunction was executed.
と出力されていないことから、Cloud Functionsの処理がスキップされていることがわかります。
Cloud Schedulerを短時間に2回実行
実際の重複実行を想定したパターンとして、Cloud Schedulerを手動で短時間に2回実行してみます。
2回目の実行(オレンジ色の枠)の変数time_diff
がとても小さい数値であることから前回の実行との時間差がほとんどないことがわかりますが、それでも重複実行と判定しCloud Functionsの処理をスキップしてくれています。
全て期待していた挙動です!以上で検証は終了です。
注意
今回、かなり短い時間差で2回Cloud Schedulerを起動した際も重複実行と判定してくれましたが、1回目と2回目の処理起動の時間差があまりに小さい場合は重複とみなされない可能性があると考えられます(Firestoreへほぼ同時にアクセスした場合)。
ただ、Firestoreが想定よりも早い処理時間でしたので、極端に小さい時間差でなければ問題はなさそうです。
また、今回はpublish_time
のみで重複実行かどうかを判別しているため、同時間帯にはCloud Functionsが複数回起動しません。
Cloud Schedulerのメッセージ内容でCloud Functionsの処理を分岐させたい場合などは、例えばメッセージ内容から抽出したパラメーターも重複かどうかの判断材料に加えることで対応できるかと思います。
最後に感想
このような設計をせずともべき等関数になっている方がもちろん良いと思いますが、どうしても難しい場合は本記事のような手法もありなのかなと思いました。
今回の実装については個人で考えたものですので、あくまでアイディアの一つとして読んでいただいた方々の参考になれば良いなと思います。